1 package org.apache.lucene.index;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.IOException;
21 import java.nio.file.Path;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.lucene.analysis.MockAnalyzer;
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.document.Field;
32 import org.apache.lucene.search.IndexSearcher;
33 import org.apache.lucene.search.PhraseQuery;
34 import org.apache.lucene.search.Query;
35 import org.apache.lucene.search.ScoreDoc;
36 import org.apache.lucene.search.Sort;
37 import org.apache.lucene.search.SortField;
38 import org.apache.lucene.search.TermQuery;
39 import org.apache.lucene.search.TopDocs;
40 import org.apache.lucene.store.BaseDirectoryWrapper;
41 import org.apache.lucene.store.Directory;
42 import org.apache.lucene.util.Bits;
43 import org.apache.lucene.util.BytesRef;
44 import org.apache.lucene.util.FailOnNonBulkMergesInfoStream;
45 import org.apache.lucene.util.IOUtils;
46 import org.apache.lucene.util.LineFileDocs;
47 import org.apache.lucene.util.LuceneTestCase;
48 import org.apache.lucene.util.NamedThreadFactory;
49 import org.apache.lucene.util.PrintStreamInfoStream;
50 import org.apache.lucene.util.TestUtil;
51
52
53
54
55
56
57
58 public abstract class ThreadedIndexingAndSearchingTestCase extends LuceneTestCase {
59
60 protected final AtomicBoolean failed = new AtomicBoolean();
61 protected final AtomicInteger addCount = new AtomicInteger();
62 protected final AtomicInteger delCount = new AtomicInteger();
63 protected final AtomicInteger packCount = new AtomicInteger();
64
65 protected Directory dir;
66 protected IndexWriter writer;
67
68 private static class SubDocs {
69 public final String packID;
70 public final List<String> subIDs;
71 public boolean deleted;
72
73 public SubDocs(String packID, List<String> subIDs) {
74 this.packID = packID;
75 this.subIDs = subIDs;
76 }
77 }
78
79
80 protected abstract IndexSearcher getCurrentSearcher() throws Exception;
81
82 protected abstract IndexSearcher getFinalSearcher() throws Exception;
83
84 protected void releaseSearcher(IndexSearcher s) throws Exception {
85 }
86
87
88 protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception;
89
90 protected Directory getDirectory(Directory in) {
91 return in;
92 }
93
94 protected void updateDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
95 writer.updateDocuments(id, docs);
96 }
97
98 protected void addDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
99 writer.addDocuments(docs);
100 }
101
102 protected void addDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
103 writer.addDocument(doc);
104 }
105
106 protected void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws Exception {
107 writer.updateDocument(term, doc);
108 }
109
110 protected void deleteDocuments(Term term) throws Exception {
111 writer.deleteDocuments(term);
112 }
113
114 protected void doAfterIndexingThreadDone() {
115 }
116
117 private Thread[] launchIndexingThreads(final LineFileDocs docs,
118 int numThreads,
119 final long stopTime,
120 final Set<String> delIDs,
121 final Set<String> delPackIDs,
122 final List<SubDocs> allSubDocs) {
123 final Thread[] threads = new Thread[numThreads];
124 for(int thread=0;thread<numThreads;thread++) {
125 threads[thread] = new Thread() {
126 @Override
127 public void run() {
128
129 final List<String> toDeleteIDs = new ArrayList<>();
130 final List<SubDocs> toDeleteSubDocs = new ArrayList<>();
131 while(System.currentTimeMillis() < stopTime && !failed.get()) {
132 try {
133
134
135
136 if (LuceneTestCase.TEST_NIGHTLY && random().nextInt(6) == 3) {
137 if (VERBOSE) {
138 System.out.println(Thread.currentThread().getName() + ": now long sleep");
139 }
140 Thread.sleep(TestUtil.nextInt(random(), 50, 500));
141 }
142
143
144 if (random().nextInt(7) == 5) {
145 Thread.sleep(TestUtil.nextInt(random(), 1, 10));
146 if (VERBOSE) {
147 System.out.println(Thread.currentThread().getName() + ": done sleep");
148 }
149 }
150
151 Document doc = docs.nextDoc();
152 if (doc == null) {
153 break;
154 }
155
156
157 final String addedField;
158 if (random().nextBoolean()) {
159 addedField = "extra" + random().nextInt(40);
160 doc.add(newTextField(addedField, "a random field", Field.Store.YES));
161 } else {
162 addedField = null;
163 }
164
165 if (random().nextBoolean()) {
166
167 if (random().nextBoolean()) {
168
169 final String packID;
170 final SubDocs delSubDocs;
171 if (toDeleteSubDocs.size() > 0 && random().nextBoolean()) {
172 delSubDocs = toDeleteSubDocs.get(random().nextInt(toDeleteSubDocs.size()));
173 assert !delSubDocs.deleted;
174 toDeleteSubDocs.remove(delSubDocs);
175
176 packID = delSubDocs.packID;
177 } else {
178 delSubDocs = null;
179
180 packID = packCount.getAndIncrement() + "";
181 }
182
183 final Field packIDField = newStringField("packID", packID, Field.Store.YES);
184 final List<String> docIDs = new ArrayList<>();
185 final SubDocs subDocs = new SubDocs(packID, docIDs);
186 final List<Document> docsList = new ArrayList<>();
187
188 allSubDocs.add(subDocs);
189 doc.add(packIDField);
190 docsList.add(TestUtil.cloneDocument(doc));
191 docIDs.add(doc.get("docid"));
192
193 final int maxDocCount = TestUtil.nextInt(random(), 1, 10);
194 while(docsList.size() < maxDocCount) {
195 doc = docs.nextDoc();
196 if (doc == null) {
197 break;
198 }
199 docsList.add(TestUtil.cloneDocument(doc));
200 docIDs.add(doc.get("docid"));
201 }
202 addCount.addAndGet(docsList.size());
203
204 final Term packIDTerm = new Term("packID", packID);
205
206 if (delSubDocs != null) {
207 delSubDocs.deleted = true;
208 delIDs.addAll(delSubDocs.subIDs);
209 delCount.addAndGet(delSubDocs.subIDs.size());
210 if (VERBOSE) {
211 System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
212 }
213 updateDocuments(packIDTerm, docsList);
214 } else {
215 if (VERBOSE) {
216 System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
217 }
218 addDocuments(packIDTerm, docsList);
219 }
220 doc.removeField("packID");
221
222 if (random().nextInt(5) == 2) {
223 if (VERBOSE) {
224 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
225 }
226 toDeleteSubDocs.add(subDocs);
227 }
228
229 } else {
230
231 final String docid = doc.get("docid");
232 if (VERBOSE) {
233 System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid);
234 }
235 addDocument(new Term("docid", docid), doc);
236 addCount.getAndIncrement();
237
238 if (random().nextInt(5) == 3) {
239 if (VERBOSE) {
240 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
241 }
242 toDeleteIDs.add(docid);
243 }
244 }
245 } else {
246
247
248
249
250 if (VERBOSE) {
251 System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
252 }
253 final String docid = doc.get("docid");
254 updateDocument(new Term("docid", docid), doc);
255 addCount.getAndIncrement();
256
257 if (random().nextInt(5) == 3) {
258 if (VERBOSE) {
259 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
260 }
261 toDeleteIDs.add(docid);
262 }
263 }
264
265 if (random().nextInt(30) == 17) {
266 if (VERBOSE) {
267 System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
268 }
269 for(String id : toDeleteIDs) {
270 if (VERBOSE) {
271 System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
272 }
273 deleteDocuments(new Term("docid", id));
274 }
275 final int count = delCount.addAndGet(toDeleteIDs.size());
276 if (VERBOSE) {
277 System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
278 }
279 delIDs.addAll(toDeleteIDs);
280 toDeleteIDs.clear();
281
282 for(SubDocs subDocs : toDeleteSubDocs) {
283 assert !subDocs.deleted;
284 delPackIDs.add(subDocs.packID);
285 deleteDocuments(new Term("packID", subDocs.packID));
286 subDocs.deleted = true;
287 if (VERBOSE) {
288 System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
289 }
290 delIDs.addAll(subDocs.subIDs);
291 delCount.addAndGet(subDocs.subIDs.size());
292 }
293 toDeleteSubDocs.clear();
294 }
295 if (addedField != null) {
296 doc.removeField(addedField);
297 }
298 } catch (Throwable t) {
299 System.out.println(Thread.currentThread().getName() + ": hit exc");
300 t.printStackTrace();
301 failed.set(true);
302 throw new RuntimeException(t);
303 }
304 }
305 if (VERBOSE) {
306 System.out.println(Thread.currentThread().getName() + ": indexing done");
307 }
308
309 doAfterIndexingThreadDone();
310 }
311 };
312 threads[thread].start();
313 }
314
315 return threads;
316 }
317
318 protected void runSearchThreads(final long stopTimeMS) throws Exception {
319 final int numThreads = TestUtil.nextInt(random(), 1, 5);
320 final Thread[] searchThreads = new Thread[numThreads];
321 final AtomicInteger totHits = new AtomicInteger();
322
323
324 final AtomicInteger totTermCount = new AtomicInteger(100);
325
326
327 for(int thread=0;thread<searchThreads.length;thread++) {
328 searchThreads[thread] = new Thread() {
329 @Override
330 public void run() {
331 if (VERBOSE) {
332 System.out.println(Thread.currentThread().getName() + ": launch search thread");
333 }
334 while (System.currentTimeMillis() < stopTimeMS && !failed.get()) {
335 try {
336 final IndexSearcher s = getCurrentSearcher();
337 try {
338
339
340
341 for(final LeafReaderContext sub : s.getIndexReader().leaves()) {
342 SegmentReader segReader = (SegmentReader) sub.reader();
343 Map<String,String> diagnostics = segReader.getSegmentInfo().info.getDiagnostics();
344 assertNotNull(diagnostics);
345 String source = diagnostics.get("source");
346 assertNotNull(source);
347 if (source.equals("merge")) {
348 assertTrue("sub reader " + sub + " wasn't warmed: warmed=" + warmed + " diagnostics=" + diagnostics + " si=" + segReader.getSegmentInfo(),
349 !assertMergedSegmentsWarmed || warmed.containsKey(segReader.core));
350 }
351 }
352 if (s.getIndexReader().numDocs() > 0) {
353 smokeTestSearcher(s);
354 Fields fields = MultiFields.getFields(s.getIndexReader());
355 Terms terms = fields.terms("body");
356 if (terms == null) {
357 continue;
358 }
359 TermsEnum termsEnum = terms.iterator();
360 int seenTermCount = 0;
361 int shift;
362 int trigger;
363 if (totTermCount.get() < 30) {
364 shift = 0;
365 trigger = 1;
366 } else {
367 trigger = totTermCount.get()/30;
368 shift = random().nextInt(trigger);
369 }
370 while (System.currentTimeMillis() < stopTimeMS) {
371 BytesRef term = termsEnum.next();
372 if (term == null) {
373 totTermCount.set(seenTermCount);
374 break;
375 }
376 seenTermCount++;
377
378 if ((seenTermCount + shift) % trigger == 0) {
379
380
381
382 totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", BytesRef.deepCopyOf(term)))));
383 }
384 }
385
386
387
388 }
389 } finally {
390 releaseSearcher(s);
391 }
392 } catch (Throwable t) {
393 System.out.println(Thread.currentThread().getName() + ": hit exc");
394 failed.set(true);
395 t.printStackTrace(System.out);
396 throw new RuntimeException(t);
397 }
398 }
399 }
400 };
401 searchThreads[thread].start();
402 }
403
404 for(Thread thread : searchThreads) {
405 thread.join();
406 }
407
408 if (VERBOSE) {
409 System.out.println("TEST: DONE search: totHits=" + totHits);
410 }
411 }
412
413 protected void doAfterWriter(ExecutorService es) throws Exception {
414 }
415
416 protected void doClose() throws Exception {
417 }
418
419 protected boolean assertMergedSegmentsWarmed = true;
420
421 private final Map<SegmentCoreReaders,Boolean> warmed = Collections.synchronizedMap(new WeakHashMap<SegmentCoreReaders,Boolean>());
422
423 public void runTest(String testName) throws Exception {
424
425 failed.set(false);
426 addCount.set(0);
427 delCount.set(0);
428 packCount.set(0);
429
430 final long t0 = System.currentTimeMillis();
431
432 Random random = new Random(random().nextLong());
433 final LineFileDocs docs = new LineFileDocs(random, true);
434 final Path tempDir = createTempDir(testName);
435 dir = getDirectory(newMockFSDirectory(tempDir));
436 if (dir instanceof BaseDirectoryWrapper) {
437 ((BaseDirectoryWrapper) dir).setCheckIndexOnClose(false);
438 }
439 MockAnalyzer analyzer = new MockAnalyzer(random());
440 analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
441 final IndexWriterConfig conf = newIndexWriterConfig(analyzer).setCommitOnClose(false);
442 conf.setInfoStream(new FailOnNonBulkMergesInfoStream());
443 if (conf.getMergePolicy() instanceof MockRandomMergePolicy) {
444 ((MockRandomMergePolicy)conf.getMergePolicy()).setDoNonBulkMerges(false);
445 }
446
447 if (LuceneTestCase.TEST_NIGHTLY) {
448
449
450
451 MergePolicy mp = conf.getMergePolicy();
452 if (mp instanceof TieredMergePolicy) {
453 ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
454 } else if (mp instanceof LogByteSizeMergePolicy) {
455 ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
456 } else if (mp instanceof LogMergePolicy) {
457 ((LogMergePolicy) mp).setMaxMergeDocs(100000);
458 }
459
460
461
462 conf.setUseCompoundFile(true);
463 mp.setNoCFSRatio(Math.max(0.25d, mp.getNoCFSRatio()));
464 }
465
466 conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
467 @Override
468 public void warm(LeafReader reader) throws IOException {
469 if (VERBOSE) {
470 System.out.println("TEST: now warm merged reader=" + reader);
471 }
472 warmed.put(((SegmentReader) reader).core, Boolean.TRUE);
473 final int maxDoc = reader.maxDoc();
474 final Bits liveDocs = reader.getLiveDocs();
475 int sum = 0;
476 final int inc = Math.max(1, maxDoc/50);
477 for(int docID=0;docID<maxDoc;docID += inc) {
478 if (liveDocs == null || liveDocs.get(docID)) {
479 final Document doc = reader.document(docID);
480 sum += doc.getFields().size();
481 }
482 }
483
484 IndexSearcher searcher = newSearcher(reader);
485 sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
486
487 if (VERBOSE) {
488 System.out.println("TEST: warm visited " + sum + " fields");
489 }
490 }
491 });
492
493 if (VERBOSE) {
494 conf.setInfoStream(new PrintStreamInfoStream(System.out) {
495 @Override
496 public void message(String component, String message) {
497 if ("TP".equals(component)) {
498 return;
499 }
500 super.message(component, message);
501 }
502 });
503 }
504 writer = new IndexWriter(dir, conf);
505 TestUtil.reduceOpenFiles(writer);
506
507 final ExecutorService es = random().nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory(testName));
508
509 doAfterWriter(es);
510
511 final int NUM_INDEX_THREADS = TestUtil.nextInt(random(), 2, 4);
512
513 final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
514
515 final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
516 final Set<String> delPackIDs = Collections.synchronizedSet(new HashSet<String>());
517 final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
518
519 final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
520
521 final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs);
522
523 if (VERBOSE) {
524 System.out.println("TEST: DONE start " + NUM_INDEX_THREADS + " indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
525 }
526
527
528 Thread.sleep(100);
529
530 doSearching(es, stopTime);
531
532 if (VERBOSE) {
533 System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
534 }
535
536 for(Thread thread : indexThreads) {
537 thread.join();
538 }
539
540 if (VERBOSE) {
541 System.out.println("TEST: done join indexing threads [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
542 }
543
544 final IndexSearcher s = getFinalSearcher();
545 if (VERBOSE) {
546 System.out.println("TEST: finalSearcher=" + s);
547 }
548
549 assertFalse(failed.get());
550
551 boolean doFail = false;
552
553
554 for(String id : delIDs) {
555 final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
556 if (hits.totalHits != 0) {
557 System.out.println("doc id=" + id + " is supposed to be deleted, but got " + hits.totalHits + " hits; first docID=" + hits.scoreDocs[0].doc);
558 doFail = true;
559 }
560 }
561
562
563 for(String id : delPackIDs) {
564 final TopDocs hits = s.search(new TermQuery(new Term("packID", id)), 1);
565 if (hits.totalHits != 0) {
566 System.out.println("packID=" + id + " is supposed to be deleted, but got " + hits.totalHits + " matches");
567 doFail = true;
568 }
569 }
570
571
572 for(SubDocs subDocs : allSubDocs) {
573 TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
574 if (!subDocs.deleted) {
575
576 if (hits.totalHits != subDocs.subIDs.size()) {
577 System.out.println("packID=" + subDocs.packID + ": expected " + subDocs.subIDs.size() + " hits but got " + hits.totalHits);
578 doFail = true;
579 } else {
580 int lastDocID = -1;
581 int startDocID = -1;
582 for(ScoreDoc scoreDoc : hits.scoreDocs) {
583 final int docID = scoreDoc.doc;
584 if (lastDocID != -1) {
585 assertEquals(1+lastDocID, docID);
586 } else {
587 startDocID = docID;
588 }
589 lastDocID = docID;
590 final Document doc = s.doc(docID);
591 assertEquals(subDocs.packID, doc.get("packID"));
592 }
593
594 lastDocID = startDocID - 1;
595 for(String subID : subDocs.subIDs) {
596 hits = s.search(new TermQuery(new Term("docid", subID)), 1);
597 assertEquals(1, hits.totalHits);
598 final int docID = hits.scoreDocs[0].doc;
599 if (lastDocID != -1) {
600 assertEquals(1+lastDocID, docID);
601 }
602 lastDocID = docID;
603 }
604 }
605 } else {
606
607
608
609 for(String subID : subDocs.subIDs) {
610 assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
611 }
612 }
613 }
614
615
616
617 final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
618 docs.close();
619
620 for(int id=0;id<endID;id++) {
621 String stringID = ""+id;
622 if (!delIDs.contains(stringID)) {
623 final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
624 if (hits.totalHits != 1) {
625 System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits + "; delIDs=" + delIDs);
626 doFail = true;
627 }
628 }
629 }
630 assertFalse(doFail);
631
632 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
633 releaseSearcher(s);
634
635 writer.commit();
636
637 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
638
639 doClose();
640
641 try {
642 writer.commit();
643 } finally {
644 writer.close();
645 }
646
647
648
649
650 if (es != null) {
651 es.shutdown();
652 es.awaitTermination(1, TimeUnit.SECONDS);
653 }
654
655 TestUtil.checkIndex(dir);
656 dir.close();
657 IOUtils.rm(tempDir);
658
659 if (VERBOSE) {
660 System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
661 }
662 }
663
664 private int runQuery(IndexSearcher s, Query q) throws Exception {
665 s.search(q, 10);
666 int hitCount = s.search(q, 10, new Sort(new SortField("titleDV", SortField.Type.STRING))).totalHits;
667 final Sort dvSort = new Sort(new SortField("titleDV", SortField.Type.STRING));
668 int hitCount2 = s.search(q, 10, dvSort).totalHits;
669 assertEquals(hitCount, hitCount2);
670 return hitCount;
671 }
672
673 protected void smokeTestSearcher(IndexSearcher s) throws Exception {
674 runQuery(s, new TermQuery(new Term("body", "united")));
675 runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
676 PhraseQuery pq = new PhraseQuery("body", "united", "states");
677 runQuery(s, pq);
678 }
679 }